kafka学习之一简单安装以及使用

1. 初识kafka框架

​ kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统(所谓消息系统:用于在不同应用之间传输数据的消息引擎系统)。

其设置初衷是为了解决互联网公司超大量级数据的实时传输,其具备如下能力

  • 高吞吐量/低延时
  • 消息持久化
  • 负载均衡和故障转移
  • 伸缩性

高吞吐量/低延时

  1. kafka消息持久化大量使用页缓存,读写消息大概率可以命中操作系统页缓存(此处需要明晰:kafka的消息持久化是直接追加到磁盘上的(磁盘为了存储快捷使用了页缓存)

  2. kafka不直接参与物理IO,而是由操作系统进行操作

  3. 写消息采用追加的方式,使用了磁盘顺序写(避免了磁盘随机写性能慢的问题)

  4. 读消息使用了liunx的sendfile的零拷贝技术

消息持久化

​ kafka框架设计的时候和别的mq框架不同,别的mq框架对于消息先使用缓存(内存),在一定时机再将缓存持久化到磁盘,而是一反常态直接使用追加写>(利用了磁盘的属性写和内存写性能相同的场景)。 kafka消息持久化是直接持久化到磁盘(顺序写),并没有采用先缓存到内存进而持久化到磁盘,因为顺序写到磁盘的速度和内存随机写的速度相当。

负载均衡和故障转移

​ kafka实现负载均衡是通过分区领导者选举(partition leader election)来实现的,使得机器上以均等的机会分散各个partition的leader从而整体上实现负载均衡。

​ kafka实现故障转移使用所有服务节点都会以会话的形式将自己注册到zk上,服务器出现宕机情况则会出现会话超时失效,此时kafka集群会选举出新的机器来提供服务。

伸缩性

​ 伸缩性是指向某个分布式系统添加(伸)和去除(缩)服务资源从而动态改变其吞吐量的能力。阻碍线性扩容的常见因素之一是服务节点之间状态的保存,服务器之间需要保存很多内部状态,自己保存的化需要处理之间的数据一致性问题,如果服务无状态(少量状态)则状态管理可以交给分布式协调服务(比如zk)来处理,进而可以很容易的扩缩容(启动新节点、下线节点)等。

而kafka就是使用zk来保管其服务节点状态(也并非使用zk来保管其所有状态,kafka节点还会自己保存少量状态)。

适用于如下需求场景:

  • 基于hadoop的批处理系统、
  • 低延迟的实时系统、
  • Storm/Spark流式处理引擎,
  • 监控日志收集
  • 消息服务

设计一个消息系统需要关注两方面因素:

  1. 消息体的设计:常见的消息体格式有xml,json、二进制等
  2. 传输协议的设计:常见的消息传输有两种形式 消息队列形式、发布/订阅模式

kafka采用的消息体是二进制的形式,传输协议是基于发布/订阅

2. kafka单节点使用

2.1 搭建环境

​ 说明:此处kafka的版本是2.10版本,所有环境搭建和使用均基于该版本进行(其他版本可能会命令会有不同,其他版本自行百度)

#kafka是使用scala语言开发(运行在jvm上) 需要确保环境安装了jdk
#同时需要确保安装了zk (kafka需要注册到zk上使用)

#1、下载kafka kafka_2.11-2.10.tgz 2.11是scala版本  2.10是kafka的版本
 wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz

#2、解压文件 
 tar -zxvf /home/kafka_2.12-3.0.0.tgz -C /usr/local/
 
#3、 启动kafka,运行日志在logs目录的server.log文件里
#后台启动,不会打印日志到控制台
 ./bin/kafka-server-start.sh -daemon config/server.properties  
#或者用
 bin/kafka-server-start.sh config/server.properties &

# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
 bin/zkCli.sh 
#查看zk的根目录kafka相关节点
 ls /brokers/ids	#查看kafka节点
 [zk: localhost:2181(CONNECTED) 5] ls /brokers/ids 
 [0]  #显示出现brokers id为0的节点已经在zk上创建(与我们server.properties配置的brokers.id=0一致)

#4、停止kafka
 bin/kafka-server-stop.sh

出现问题

启动kafka 因为分配内存报错(学习使用的是阿里云 运行总内存1G ),因为kafka中启动脚本默认分配内存大小为1G所以报内存分配不足的错误
在这里插入图片描述

需要调整kafka-server-start-sh中的启动命令,调整后如下:
在这里插入图片描述

server.properties核心配置详解:

Property Default Description
broker.id 0 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。
log.dirs /tmp/kafka-logs kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
listeners PLAINTEXT://ip:9092 server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connect localhost:2181 zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3
log.retention.hours 168 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。
num.partitions 1 创建topic的默认分区数
default.replication.factor 1 自动创建topic的默认副本数量,建议设置为大于等于2
min.insync.replicas 1 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enable false 是否允许删除主题

在这里插入图片描述

2.2 topic 主题

#一、 2.2之前的版本 Kafka 使用如下明细
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

#二、 查看kafka存在的topic
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

#三、 删除topic
bin/kafka-topics.sh --delete --topic test --zookeeper 127.0.0.1:2181

创建topic

在这里插入图片描述
查看topic
在这里插入图片描述

2.3 生产/消费消息

# 开启生产者 以命令行的方式
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 

# 订阅topic中最新的消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
# 消费(订阅)topic中所有消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test

#消费多个主题
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --whitelist "test|test-2"

2.3.1 单播消费

​ 单播消费是多个消费者只有一个消费成功,类似于队列queue形式,只需让所有消费者在同一个消费组里即可

# 创建多个消费者时候使用--consumer-property group.id=${消费组标识}  设置同一个消费组
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --consumer-property group.id=testGroup --topic test 

2.3.2 多播消费

一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。

# 创建多个消费者时候使用--consumer-property group.id=${消费组标识}  设置同一个消费组
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --consumer-property group.id=testGroup2 --topic test 

#查看消息分组
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list 

2.3.3 消费组信息

#查看消费组详情信息
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testGroup

在这里插入图片描述

属性 说明
TOPIC 该消费组需要消费的主题(topic)
PARTITION 该消费组需要消费的分区 Kafka中的一个topic出于性能考虑每个kafkatopic都有若干个partition组成。kafka的Partition实际上没有太多的业务含义,它的引入就是单纯地为了提升系统的吞吐量
CURRENT-OFFSET 当前消费组的已消费偏移量
LOG_END_OFFSET 主题对应分区消息的结束偏移量(HW)
LAG 当前消费组未消费的消息数
CONSUMER-ID 消费者id

2.4 TOPIC概念

# 查看topic信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1

# 增加topic的分区信息
bin/kafka-topics.sh -alter --partitions 3 --zookeeper 127.0.0.1:2181 --topic test

在这里插入图片描述

第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。

  • leader节点负责给定partition的所有读写请求。
  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
  • isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
  1. Topic: 是一个类别的名称,同类消息发送到同一个Topic下面,为了提高系统吞吐量(并行度)topic下分为多个多个分区(Partition)日志文件

  2. Partition: 分区(物理概念),topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上进行分布式存储,分区对应一个commit log文件(.log结尾的文件),里面包含有序的message序列。

  3. message: kakfa中的消息 每个消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message,每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中消费offset由consumer自己来维护,通过指定offset可以来重复消费或者跳过某些消息。

2.5. 消息设计

​ kafka的消息格式的设计采用如下

  1. CRC (4B)

  2. version (1B)

  3. 版本号

  4. 属性(1B)保存消息的压缩类型

  5. 时间戳(8B)

  6. key的长度 (4B)

  7. key (字节不固定)

  8. value长度

  9. value值(消息值)

在这里插入图片描述
同时该消息体使用紧凑的二进制数组来避免存储空间的浪费。

2.6 replica副本

​ 创建topic的时候可以通过修改replica的数量备份多份保证数据的可靠性,同时多个副本一定是保存在不同的broker上的但是只有leader对应的broker的副本是负责响应消息写入和消费的。flower对应的副本是有主leader宕机的时候被选举为主的broker才会进行处理消息写入和消费。
在这里插入图片描述

  1. kafka为partition动态维护了一个replica集合叫做ISR(in-sync replica)与leader 的保持同步的replica集合。只有isr中的所有replica都收到消息,kafka才将该消息置为已提交的状态,消息才能不丢失。
  2. kafka会将isr中与leader的replica数据落后太多的replica自动剔除,如果剔除的replica数据重新‘追上’主 则会被自动的加入。

2.7 kafka的使用场景

消息队列

​ kafka是以消息引擎闻名,所有可以当做mq来进行解耦生产者和消费者和批量处理消息。kafka的高吞吐量、分区和副本机制保证了消息传输的可靠性和高容错性。为实现一个大数据量的消息处理应用提供了很好的基础。

日志收集

  • 网站行为日志追踪 很多网站的用户点击操点击流数据量很大使用高吞吐性能的kafkak收集信息后续进行数据处理或者机器学习分析用户行为。
  • 审计数据日志收集
  • 分布式日志收集

流式处理

kafka推出了一个流式处理框架 kafka stream 可以用来像spark stream、apache flink一样处理流式数据。

3. kafka集群使用

在这里插入图片描述

3.1 集群搭建

对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可,这里启动三个实例

# 复制两个配置
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改配置文件

#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://9093   
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=127.0.0.1:2181
# 启动两个节点
bin/kafka-server-start.sh  config/server1.properties &
bin/kafka-server-start.sh  config/server2.properties &


#创建一个新的topic,副本数设置为3,分区数设置为2:
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

#查看下topic的情况
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic

在这里插入图片描述
以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。

  • leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)
  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
  • isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
#集群发送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic my-replicated-topic

#集群消费消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --from-beginning --topic my-replicated-topic

生产者生产消息到kafka
在这里插入图片描述

消费者消费kafka集群消费

在这里插入图片描述

3.2 kafka集群特性

kafka的容错性

my-replicated-topic的信息
在这里插入图片描述
查看到 my-replicated-topic 有两个分区 partition 0和partition 1 对该分区的进行读写请求的是brokerId=3,这里杀死brokerId=3的进行再次查看my-replicated-topic 如下
在这里插入图片描述
对my-replicated-topic 两个分区的读写变成了brokerid = 1和brokerId =0 且能正常的生产和消费消息。kafka将很多集群关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。

kafka集群数据同步流程

  1. topic的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。

  2. 针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。

  3. leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。、

  4. 生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

kafka集群情况下的消费顺序

  1. 一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。

  2. consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。

  3. Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>