本文共 8299 字,大约阅读时间需要 27 分钟。
对网站使用情况做报表,如活动数据(page view、查看内容、搜索内容等)和运营数据(CPU、IO使用率、请求时间、服务器日志等)要用到的数据的收集和分析。
Kafka是一个分布式的,基于发布/订阅的消息系统:
消息处理时间不受数据量的影响,即使TB级别的消息也可以保证访问的性能。
高吞吐量 支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输 支持离线数据处理和在线实时数据处理 支持水平扩展
Broker: 一个Kafka集群包含一个或多个borker
topic: 一个broker下包含多个topic partition: 一个topic包含一个或多个partition productor: 生产者 consumer : 消费者 Consumer Group: 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
productor: push
consumer: pull(消费者可以根据自身的能力来进行消费)
Kafka默认保证At least once
,并且允许通过设置Producer异步提交来实现At most once
。而Exactly once要求与外部存储系统协作
,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。
At most once 消息可能会丢,但绝不会重复传输
At least one 消息绝不会丢,但可能会重复传输 (Kafka默认支持该种方式
) Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
默认情况下一条消息从Producer到broker是确保了At least once
,可通过设置Producer异步发送实现At most once
也就是说默认情况小Kafka
消息是会重复投递的。
zookeeper中保存了consumer 消费的offset;consumer offset
commit的方式有:
获取消息后马上commit:有可能消息得不到处理。
获取消息后先处理再commit:消息至少被处理一遍。At most once
At least one
Exactly once:在At least one
的基础上,从架构设计上(与外部系统结合)保证不重复处理。
Data Replication
Leader Election: 针对partition
而言的
没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。
如果Producer使用
同步模式
:尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception
- 停止发送后续数据:造成数据的阻塞
- 继续发送后续数据:造成本应发往该Broker的数据的丢失
如果Producer使用
异步模式
:尝试重新发送message.send.max.retries(默认值为3)次后
- 记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。
指Replica之间的Leader Election
保证同一个Partition
的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。
如何将所有Replica均匀分布到整个集群
- 将所有Broker(假设共n个Broker)和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上
- 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
Title:消息备份流程producter->partition: (leader,负责读写数据)partition->follower: ACK(先应答[内存中]再写入log)follower->follower内存follower内存->ACK应答ACK应答->followerfollower->logfollower->partition: ISR列表中的所有follower都应答作为partition应答partition->producter: 所有的ISR replica都应答后,leader partition应答
备份同步列表:ISR
(即in-sync Replica)
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)
同步复制:同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率
异步复制:Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。
而Kafka采用的既不是同步复制,也不是异步复制。而是采用ISR的方式来均衡确保数据不丢失以及吞吐率。
领导选举的方式
少数服从多数: 如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
等待ISR中的任一个Replica“活”过来,并且选它作为Leader: 等待时间可能较长。
选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader: 可能会导致数据不全,部分消息丢失。
Title:传统Leader选举clound->Leader: leaderLeader->zk: (leader 宕机)zk->zk节点: znode会自动删除clound->follower: followerfollower->zk节点: 创建节点zk节点->Leader: 竞选leader成功
该方法会有3个问题:
- split-brain 这是由Zookeeper的特性引起的,虽然Zookeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
- herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
- Zookeeper负载过重 每个Replica都要为此在Zookeeper上注册一个Watch,当集群规模增加到几千个Partition时Zookeeper负载会过重。
Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
- Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker
- Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition
- 对set_p中的每一个Partition
- 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
- 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
- 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1
- 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
High Level Consumer
: Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。
Consumer Group
: High Level Consumer将从某个Partition中读取最后一条消息的offset存于Zookeeper中,这个offset基于客户程序提供给Kafka的名字来保存,这个名字就叫做consumer group。也就是说用来保存consumer offset的名字。 与传统mq的不同:
- 很多传统的Message Queue都会在消息被消费完后将消息删除 (一方面避免重复消费,另一方面可以保证Queue的长度比较短,提高效率)。
- Kafka保证一条消息在同一个consumer group中只消费一次。
- 允许不同的consumer group同时消费同一条消息。
每个Consumer被创建时会触发Consumer Group的Rebalance:
- High Level Cousumer 启动时将ID注册到Zookeeper的其Counsumer group下,路径为/consumers/[consumer group]/ids/[consumer id]
- 在/consumers/[consumer group]/ids上注册Watch
- 在/brokers/ids上注册Watch
- 如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上也创建Watch
- 强制自己在其Consumer Group内启动Rebalance流程
存在的问题:
- 任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance。
- 多个consumer同时触发Rebalance,会导致不正确的Rebalance。
需要对partion或者做特殊处理
- 同一条消息读多次
- 只读取某个Topic的部分Partition
- 管理事务,从而确保每条消息被处理一次,且仅被处理一次
与Consumer Group相比,Low Level Consumer要求用户做大量的额外工作。
- 必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个Partition的Leader是谁
- 必须处理Leader的变化
使用Low Level Consumer的一般流程如下
- 查找到一个“活着”的Broker,并且找出每个Partition的Leader
- 找出每个Partition的Follower
- 定义好请求,该请求应该能描述应用程序需要哪些数据
- Fetch数据
- 识别Leader的变化,并对之作出必要的响应
- 由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。
- 由于Partition在物理上对应一个文件夹,即使多个Partition位于同一个节点,也可通过配置让同一节点上的不同Partition置于不同的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。
利用多磁盘的具体方法是,将不同磁盘mount到不同目录,然后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将所有Partition尽可能均匀分配到不同目录也即不同目录(也即不同disk)上。
- 多Consumer消费同一个Topic时,同一条消息只会被同一Consumer Group内的一个Consumer所消费。
- 如果Consumer的个数多于Partition的个数,那么会有部分Consumer无法消费该Topic的任何数据,也即当Consumer个数超过Partition后,增加Consumer并不能增加并行度。
Partition个数决定了可能的最大并行度。
CAP理论
CAP理论是指,分布式系统中,一致性、可用性和分区容忍性最多只能同时满足两个。
一致性
- 通过某个节点的写操作结果对后面通过其它节点的读操作可见
- 如果更新数据后,并发访问情况下后续读操作可立即感知该更新,称为强一致性
- 如果允许之后部分或者全部感知不到该更新,称为弱一致性
- 若在之后的一段时间(通常该时间不固定)后,一定可以感知到该更新,称为最终一致性
可用性
- 任何一个没有发生故障的节点必须在有限的时间内返回合理的结果
分区容忍性
- 部分节点宕机或者无法与其它节点通信时,各分区间还可保持分布式系统的功能
一般而言,都要求保证分区容忍性。所以在CAP理论下,更多的是需要在可用性和一致性之间做权衡。
Master-Slave
- RDBMS的读写分离即为典型的Master-Slave方案
- 同步复制可保证强一致性但会影响可用性
- 异步复制可提供高可用性但会降低一致性
基于ISR的数据复制方案
只有ISR中的所有Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。
- 由于Leader可移除不能及时与之同步的Follower,故与同步复制相比可避免最慢的Follower拖慢整体速度,也即ISR提高了系统可用性。
- ISR中的所有Follower都包含了所有Commit过的消息,而只有Commit过的消息才会被Consumer消费,故从Consumer的角度而言,ISR中的所有Replica都始终处于同步状态,从而与异步复制方案相比提高了数据一致性。
- ISR可动态调整,极限情况下,可以只包含Leader,极大提高了可容忍的宕机的Follower的数量。与Majority Quorum方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。
顺序写磁盘
- 将Partition分为多个Segment,每个Segment对应一个物理文件;
- 通过删除整个文件的方式去删除Partition内的数据。这种方式清除旧数据的方式,也避免了对文件的随机写操作。
** 充分利用Page Cache **
Page Cache的优势:
- I/O Scheduler会将连续的小块写组装成大块的物理写从而提高性能
- I/O Scheduler会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
- 充分利用所有空闲内存(非JVM内存)。如果使用应用层Cache(即JVM堆内存),会增加GC负担
- 读操作可直接在Page Cache内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过Page Cache)交换数据
- 如果进程重启,JVM内的Cache会失效,但Page Cache仍然可用
存在问题:
- 将数据写入Page Cache,并不保证数据一定完全写入磁盘。
- 可能会造成机器宕机时,Page Cache内的数据未写入磁盘从而造成数据丢失。
解决措施:
- 这种场景完全可以由Kafka层面的Replication机制去解决
- 提供了flush.messages和flush.ms两个参数将Page Cache中的数据强制Flush到磁盘,但是Kafka并不建议使用。(降低性能)
支持多Disk Drive
Broker的log.dirs配置项,允许配置多个文件夹。如果机器上有多个Disk Drive,可将不同的Disk挂载到不同的目录,然后将这些目录都配置到log.dirs里。Kafka会尽可能将不同的Partition分配到不同的目录,也即不同的Disk上,从而充分利用了多Disk的优势。
- 传统模式下的四次拷贝与四次上下文切换
- sendfile和transferTo实现零拷贝
Linux 2.4+内核通过sendfile系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
注: transferTo和transferFrom并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供sendfile这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。
- 批处理: 批处理既减少了网络传输的Overhead,又提高了写磁盘的效率。
- 数据压缩降低网络负载: 将整个Batch的消息一起压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。因此将整个Batch的数据一起压缩能更大幅度减小数据量,从而更大程度提高网络传输效率。
- 高效的序列化方式: 减少实际网络传输和磁盘存储的数据规模,从而提高吞吐率。这里要注意,如果使用的序列化方法太慢,即使压缩比非常高,最终的效率也不一定高。
转载地址:http://tswno.baihongyu.com/