kafka学习之一简单安装以及使用
文章目录
1. 初识kafka框架
kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统(所谓消息系统:用于在不同应用之间传输数据的消息引擎系统)。
其设置初衷是为了解决互联网公司超大量级数据的实时传输,其具备如下能力
- 高吞吐量/低延时
- 消息持久化
- 负载均衡和故障转移
- 伸缩性
高吞吐量/低延时
-
kafka消息持久化大量使用页缓存,读写消息大概率可以命中操作系统页缓存(此处需要明晰:kafka的消息持久化是直接追加到磁盘上的(磁盘为了存储快捷使用了页缓存))
-
kafka不直接参与物理IO,而是由操作系统进行操作
-
写消息采用追加的方式,使用了磁盘顺序写(避免了磁盘随机写性能慢的问题)
-
读消息使用了liunx的sendfile的零拷贝技术
消息持久化
kafka框架设计的时候和别的mq框架不同,别的mq框架对于消息先使用缓存(内存),在一定时机再将缓存持久化到磁盘,而是一反常态直接使用追加写>(利用了磁盘的属性写和内存写性能相同的场景)。 kafka消息持久化是直接持久化到磁盘(顺序写),并没有采用先缓存到内存进而持久化到磁盘,因为顺序写到磁盘的速度和内存随机写的速度相当。
负载均衡和故障转移
kafka实现负载均衡是通过分区领导者选举(partition leader election)来实现的,使得机器上以均等的机会分散各个partition的leader从而整体上实现负载均衡。
kafka实现故障转移使用所有服务节点都会以会话的形式将自己注册到zk上,服务器出现宕机情况则会出现会话超时失效,此时kafka集群会选举出新的机器来提供服务。
伸缩性
伸缩性是指向某个分布式系统添加(伸)和去除(缩)服务资源从而动态改变其吞吐量的能力。阻碍线性扩容的常见因素之一是服务节点之间状态的保存,服务器之间需要保存很多内部状态,自己保存的化需要处理之间的数据一致性问题,如果服务无状态(少量状态)则状态管理可以交给分布式协调服务(比如zk)来处理,进而可以很容易的扩缩容(启动新节点、下线节点)等。
而kafka就是使用zk来保管其服务节点状态(也并非使用zk来保管其所有状态,kafka节点还会自己保存少量状态)。
适用于如下需求场景:
- 基于hadoop的批处理系统、
- 低延迟的实时系统、
- Storm/Spark流式处理引擎,
- 监控日志收集
- 消息服务
设计一个消息系统需要关注两方面因素:
- 消息体的设计:常见的消息体格式有xml,json、二进制等
- 传输协议的设计:常见的消息传输有两种形式 消息队列形式、发布/订阅模式
kafka采用的消息体是二进制的形式,传输协议是基于发布/订阅
2. kafka单节点使用
2.1 搭建环境
说明:此处kafka的版本是2.10版本,所有环境搭建和使用均基于该版本进行(其他版本可能会命令会有不同,其他版本自行百度)
#kafka是使用scala语言开发(运行在jvm上) 需要确保环境安装了jdk
#同时需要确保安装了zk (kafka需要注册到zk上使用)
#1、下载kafka kafka_2.11-2.10.tgz 2.11是scala版本 2.10是kafka的版本
wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
#2、解压文件
tar -zxvf /home/kafka_2.12-3.0.0.tgz -C /usr/local/
#3、 启动kafka,运行日志在logs目录的server.log文件里
#后台启动,不会打印日志到控制台
./bin/kafka-server-start.sh -daemon config/server.properties
#或者用
bin/kafka-server-start.sh config/server.properties &
# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh
#查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点
[zk: localhost:2181(CONNECTED) 5] ls /brokers/ids
[0] #显示出现brokers id为0的节点已经在zk上创建(与我们server.properties配置的brokers.id=0一致)
#4、停止kafka
bin/kafka-server-stop.sh
出现问题:
启动kafka 因为分配内存报错(学习使用的是阿里云 运行总内存1G ),因为kafka中启动脚本默认分配内存大小为1G所以报内存分配不足的错误
需要调整kafka-server-start-sh中的启动命令,调整后如下:
server.properties核心配置详解:
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。 |
listeners | PLAINTEXT://ip:9092 | server接受客户端连接的端口,ip配置kafka本机ip即可 |
zookeeper.connect | localhost:2181 | zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。 |
num.partitions | 1 | 创建topic的默认分区数 |
default.replication.factor | 1 | 自动创建topic的默认副本数量,建议设置为大于等于2 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 |
delete.topic.enable | false | 是否允许删除主题 |
2.2 topic 主题
#一、 2.2之前的版本 Kafka 使用如下明细
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
#二、 查看kafka存在的topic
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
#三、 删除topic
bin/kafka-topics.sh --delete --topic test --zookeeper 127.0.0.1:2181
创建topic
查看topic
2.3 生产/消费消息
# 开启生产者 以命令行的方式
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
# 订阅topic中最新的消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
# 消费(订阅)topic中所有消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
#消费多个主题
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --whitelist "test|test-2"
2.3.1 单播消费
单播消费是多个消费者只有一个消费成功,类似于队列queue形式,只需让所有消费者在同一个消费组里即可
# 创建多个消费者时候使用--consumer-property group.id=${消费组标识} 设置同一个消费组
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup --topic test
2.3.2 多播消费
一条消息能被多个消费者消费的模式,类似publish-subscribe模式费,针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。
# 创建多个消费者时候使用--consumer-property group.id=${消费组标识} 设置同一个消费组
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --consumer-property group.id=testGroup2 --topic test
#查看消息分组
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
2.3.3 消费组信息
#查看消费组详情信息
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testGroup
属性 | 说明 |
---|---|
TOPIC | 该消费组需要消费的主题(topic) |
PARTITION | 该消费组需要消费的分区 Kafka中的一个topic出于性能考虑每个kafka的topic都有若干个partition组成。kafka的Partition实际上没有太多的业务含义,它的引入就是单纯地为了提升系统的吞吐量 |
CURRENT-OFFSET | 当前消费组的已消费偏移量 |
LOG_END_OFFSET | 主题对应分区消息的结束偏移量(HW) |
LAG | 当前消费组未消费的消息数 |
CONSUMER-ID | 消费者id |
2.4 TOPIC概念
# 查看topic信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test1
# 增加topic的分区信息
bin/kafka-topics.sh -alter --partitions 3 --zookeeper 127.0.0.1:2181 --topic test
第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
- leader节点负责给定partition的所有读写请求。
- replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
- isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
-
Topic: 是一个类别的名称,同类消息发送到同一个Topic下面,为了提高系统吞吐量(并行度)topic下分为多个多个分区(Partition)日志文件
-
Partition: 分区(物理概念),topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上进行分布式存储,分区对应一个commit log文件(.log结尾的文件),里面包含有序的message序列。
-
message: kakfa中的消息 每个消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message,每个consumer是基于自己在commit log中的消费进度(offset)来进行工作的。在kafka中消费offset由consumer自己来维护,通过指定offset可以来重复消费或者跳过某些消息。
2.5. 消息设计
kafka的消息格式的设计采用如下
-
CRC (4B)
-
version (1B)
-
版本号
-
属性(1B)保存消息的压缩类型
-
时间戳(8B)
-
key的长度 (4B)
-
key (字节不固定)
-
value长度
-
value值(消息值)
同时该消息体使用紧凑的二进制数组来避免存储空间的浪费。
2.6 replica副本
创建topic的时候可以通过修改replica的数量备份多份保证数据的可靠性,同时多个副本一定是保存在不同的broker上的但是只有leader对应的broker的副本是负责响应消息写入和消费的。flower对应的副本是有主leader宕机的时候被选举为主的broker才会进行处理消息写入和消费。
- kafka为partition动态维护了一个replica集合叫做ISR(in-sync replica)与leader 的保持同步的replica集合。只有isr中的所有replica都收到消息,kafka才将该消息置为已提交的状态,消息才能不丢失。
- kafka会将isr中与leader的replica数据落后太多的replica自动剔除,如果剔除的replica数据重新‘追上’主 则会被自动的加入。
2.7 kafka的使用场景
消息队列
kafka是以消息引擎闻名,所有可以当做mq来进行解耦生产者和消费者和批量处理消息。kafka的高吞吐量、分区和副本机制保证了消息传输的可靠性和高容错性。为实现一个大数据量的消息处理应用提供了很好的基础。
日志收集
- 网站行为日志追踪 很多网站的用户点击操点击流数据量很大使用高吞吐性能的kafkak收集信息后续进行数据处理或者机器学习分析用户行为。
- 审计数据日志收集
- 分布式日志收集
流式处理
kafka推出了一个流式处理框架 kafka stream 可以用来像spark stream、apache flink一样处理流式数据。
3. kafka集群使用
3.1 集群搭建
对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可,这里启动三个实例
# 复制两个配置
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改配置文件
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=127.0.0.1:2181
# 启动两个节点
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
#创建一个新的topic,副本数设置为3,分区数设置为2:
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
#查看下topic的情况
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic my-replicated-topic
以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
- leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)
- replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
- isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
#集群发送消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic my-replicated-topic
#集群消费消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --from-beginning --topic my-replicated-topic
生产者生产消息到kafka
消费者消费kafka集群消费
3.2 kafka集群特性
kafka的容错性
my-replicated-topic的信息
查看到 my-replicated-topic 有两个分区 partition 0和partition 1 对该分区的进行读写请求的是brokerId=3,这里杀死brokerId=3的进行再次查看my-replicated-topic 如下
对my-replicated-topic 两个分区的读写变成了brokerid = 1和brokerId =0 且能正常的生产和消费消息。kafka将很多集群关键信息记录在zookeeper里,保证自己的无状态,从而在水平扩容时非常方便。
kafka集群数据同步流程
-
topic的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。
-
针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。
-
leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。、
-
生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。
kafka集群情况下的消费顺序
-
一个partition同一个时刻在一个consumer group中只能有一个consumer instance在消费,从而保证消费顺序。
-
consumer group中的consumer instance的数量不能比一个Topic中的partition的数量多,否则,多出来的consumer消费不到消息。
-
Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。